package com.xxl.job.core.thread;

import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import com.xxl.job.core.util.JdkSerializeTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by xuxueli on 16/7/22.
 */
public class TriggerCallbackThread {

  private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);

  private static TriggerCallbackThread instance = new TriggerCallbackThread();

  public static TriggerCallbackThread getInstance() {
    return instance;
  }

  /**
   * job results callback queue
   */
  private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();

  public static void pushCallBack(HandleCallbackParam callback) {
    getInstance().callBackQueue.add(callback);
    logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
  }

  /**
   * callback thread
   */
  private Thread triggerCallbackThread;
  private Thread triggerRetryCallbackThread;
  private volatile boolean toStop = false;

  public void start() {

    // valid
    if (XxlJobExecutor.getAdminBizList() == null) {
      logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
      return;
    }

    // callback
    triggerCallbackThread = new Thread(new Runnable() {

      @Override
      public void run() {

        // normal callback
        while (!toStop) {
          try {
            HandleCallbackParam callback = getInstance().callBackQueue.take();
            if (callback != null) {

              // callback list param
              List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
              int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
              callbackParamList.add(callback);

              // callback, will retry if error
              if (callbackParamList != null && callbackParamList.size() > 0) {
                doCallback(callbackParamList);
              }
            }
          } catch (Exception e) {
            if (!toStop) {
              logger.error(e.getMessage(), e);
            }
          }
        }

        // last callback
        try {
          List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
          int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
          if (callbackParamList != null && callbackParamList.size() > 0) {
            doCallback(callbackParamList);
          }
        } catch (Exception e) {
          if (!toStop) {
            logger.error(e.getMessage(), e);
          }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");

      }
    });
    triggerCallbackThread.setDaemon(true);
    triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
    triggerCallbackThread.start();

    // retry
    triggerRetryCallbackThread = new Thread(new Runnable() {
      @Override
      public void run() {
        while (!toStop) {
          try {
            retryFailCallbackFile();
          } catch (Exception e) {
            if (!toStop) {
              logger.error(e.getMessage(), e);
            }

          }
          try {
            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
          } catch (InterruptedException e) {
            if (!toStop) {
              logger.error(e.getMessage(), e);
            }
          }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
      }
    });
    triggerRetryCallbackThread.setDaemon(true);
    triggerRetryCallbackThread.start();

  }

  public void toStop() {
    toStop = true;
    // stop callback, interrupt and wait
    if (triggerCallbackThread != null) {    // support empty admin address
      triggerCallbackThread.interrupt();
      try {
        triggerCallbackThread.join();
      } catch (InterruptedException e) {
        logger.error(e.getMessage(), e);
      }
    }

    // stop retry, interrupt and wait
    if (triggerRetryCallbackThread != null) {
      triggerRetryCallbackThread.interrupt();
      try {
        triggerRetryCallbackThread.join();
      } catch (InterruptedException e) {
        logger.error(e.getMessage(), e);
      }
    }

  }

  /**
   * do callback, will retry if error
   *
   * @param callbackParamList
   */
  private void doCallback(List<HandleCallbackParam> callbackParamList) {
    boolean callbackRet = false;
    // callback, will retry if error
    for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
      try {
        ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
        if (callbackResult != null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
          callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
          callbackRet = true;
          break;
        } else {
          callbackLog(callbackParamList,
              "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
        }
      } catch (Exception e) {
        callbackLog(callbackParamList,
            "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
      }
    }
    if (!callbackRet) {
      appendFailCallbackFile(callbackParamList);
    }
  }

  /**
   * callback log
   */
  private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent) {
    for (HandleCallbackParam callbackParam : callbackParamList) {
      String logFileName = XxlJobFileAppender.makeLogFileName(
          new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());
      XxlJobContext.setXxlJobContext(new XxlJobContext(
          -1,
          null,
          logFileName,
          -1,
          -1));
      XxlJobHelper.log(logContent);
    }
  }

  // ---------------------- fail-callback file ----------------------

  private static String failCallbackFilePath = XxlJobFileAppender.getLogPath()
      .concat(File.separator).concat("callbacklog").concat(File.separator);
  private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}")
      .concat(".log");

  private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList) {
    // valid
    if (callbackParamList == null || callbackParamList.size() == 0) {
      return;
    }

    // append file
    byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);

    File callbackLogFile = new File(
        failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
    if (callbackLogFile.exists()) {
      for (int i = 0; i < 100; i++) {
        callbackLogFile = new File(failCallbackFileName.replace("{x}",
            String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i))));
        if (!callbackLogFile.exists()) {
          break;
        }
      }
    }
    FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
  }

  private void retryFailCallbackFile() {

    // valid
    File callbackLogPath = new File(failCallbackFilePath);
    if (!callbackLogPath.exists()) {
      return;
    }
    if (callbackLogPath.isFile()) {
      callbackLogPath.delete();
    }
    if (!(callbackLogPath.isDirectory() && callbackLogPath.list() != null
        && callbackLogPath.list().length > 0)) {
      return;
    }

    // load and clear file, retry
    for (File callbaclLogFile : callbackLogPath.listFiles()) {
      byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);

      // avoid empty file
      if (callbackParamList_bytes == null || callbackParamList_bytes.length < 1) {
        callbaclLogFile.delete();
        continue;
      }

      List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(
          callbackParamList_bytes, List.class);

      callbaclLogFile.delete();
      doCallback(callbackParamList);
    }

  }

}
